DynamoDB ストリームから Lambda 関数を呼び出して Kinesis Firehose を経由して S3 バケットに出力してみた
コンバンハ、千葉(幸)です。
DynamoDB には DynamoDB ストリームという機能があり、DynamoDB テーブルのアイテムに変更があった場合にそれをキャプチャすることができます。
DynamoDB ストリームをイベントとして Lambda 関数を呼び出すことを俗に DynamoDB トリガーと呼びます。
また、DynamoDB テーブルではアイテムに有効期限(TTL)を設けて自動的な削除ができます。TTL により削除されたアイテムも DynamoDB ストリームのキャプチャ対象となるため、「期限切れで削除されたアイテムのみ Lambda 関数を用いて S3 にアーカイブ」ということもできます。
今回は、
- DynamoDB ストリームで新規に追加されたアイテムをキャプチャし、
- それをトリガーに Lambda 関数を呼び出し、
- Kinesis Data Firehose 配信ストリームを通じて S3 バケットに出力する
というパターンを試してみます。
以下のブログを参考にします。
全体像は以下です。
目次
- やってみた
- 1. Lambda 関数用コードの S3 へのアップロード
- 2. CloudFormationのデプロイ
- 3. DynamoDB テーブルへのアイテム追加
- 4. S3 バケットの確認
- 5. Athena を用いた分析
- 終わりに
やってみた
今回のステップは以下の通りです。
- Lambda 関数用コードの S3 へのアップロード
- CloudFormation のデプロイ
- DynamoDB テーブルへのアイテム追加
- S3 バケットの確認
- Athena を用いた分析
使用するスクリプト、 CloudFormation テンプレートはこちらにあるものを使用します。
1. Lambda 関数用コードの S3 へのアップロード
リソースは基本的に次のステップで CloudFormation により一括作成するのですが、一点だけ事前準備が必要です。Lambda 関数にセットするコードを S3 バケットから読み取るようにされているため、そのコードをアップロードします。
以下のスクリプトを zip 化して任意の S3 バケットにアップロードします。
######################################################################################### # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ######################################################################################### import os, json, base64, boto3 firehose = boto3.client('firehose') print('Loading function') def recToFirehose(streamRecord): ddbRecord = streamRecord['NewImage'] toFirehose = {} for c in ddbRecord: toFirehose = next(iter(ddbRecord.values())) jddbRecord = json.loads(ddbRecord['info']['S']) # Transform the record a bit try: rating = jddbRecord['rating'] except: rating = 0 try: actors = jddbRecord['actors'] except: actors = [' ',' '] actor1 = actors[0] try: actor2 = actor[1] except: actor2 = ' ' try: genres = jddbRecord['genres'] except: genres = ['',''] genre1 = genres[0] try: genre2 = genres[1] except: genre2 = ' ' try: directors = jddbRecord['directors'] except: directors = [' ',' '] director1 = directors[0] try: director2 = directors[1] except: director2 = ' ' toFirehose["actor1"] = actor1 toFirehose["actor2"] = actor2 toFirehose["director1"] = director1 toFirehose["director2"] = director2 toFirehose["genre1"] = genre1 toFirehose["genre2"] = genre2 toFirehose["rating"] = rating jtoFirehose = json.dumps(toFirehose) response = firehose.put_record( DeliveryStreamName=os.environ['DeliveryStreamName'], Record= { 'Data': jtoFirehose + '\n' } ) print(response) def lambda_handler(event, context): for record in event['Records']: if (record['eventName']) != 'REMOVE': recToFirehose(record['dynamodb']) return 'Successfully processed {} records.'.format(len(event['Records']))
後続の CloudFormation テンプレートで名称が決め打ちされているため、オブジェクト名はddb-to-firehose.zip
である必要があります。
2. CloudFormationのデプロイ
以下のテンプレートを用いて各種リソースをデプロイします。
折り畳み
AWSTemplateFormatVersion: 2010-09-09 Description: >- AWS CloudFormation: Parameters: DynamoDBTableName: Description: DynamoDB Table Name Type: String AllowedPattern: '[a-zA-Z0-9]*' MinLength: '1' MaxLength: '255' ConstraintDescription: must contain only alphanumeric characters LambdaCodeBucket: Description: S3 bucket containing the Lambda function code Type: String Resources: myDynamoDBTable: Type: 'AWS::DynamoDB::Table' Properties: TableName: !Ref DynamoDBTableName StreamSpecification: StreamViewType: NEW_IMAGE AttributeDefinitions: - AttributeName: Year AttributeType: N - AttributeName: Title AttributeType: S KeySchema: - AttributeName: Year KeyType: HASH - AttributeName: Title KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 TimeToLiveSpecification: Enabled: True AttributeName: ExpireTime myS3Bucket: Type: 'AWS::S3::Bucket' Properties: PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true firehoseDeliveryStream: DependsOn: - deliveryPolicy Type: 'AWS::KinesisFirehose::DeliveryStream' Properties: DeliveryStreamName: !Ref DynamoDBTableName ExtendedS3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !Ref myS3Bucket BufferingHints: IntervalInSeconds: '60' SizeInMBs: '1' CompressionFormat: UNCOMPRESSED Prefix: firehose/ RoleARN: !GetAtt deliveryRole.Arn deliveryRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' deliveryPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: Description: Managed policy for firehose Roles: - !Ref deliveryRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref myS3Bucket - !Join - '' - - 'arn:aws:s3:::' - !Ref myS3Bucket - '*' lambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Sid: '' Effect: Allow Principal: Service: lambda.amazonaws.com Action: 'sts:AssumeRole' ddbToFirehose: Type: "AWS::Lambda::Function" Properties: Handler: "ddb-to-firehose.lambda_handler" Role: Fn::GetAtt: - "lambdaExecutionRole" - "Arn" Code: S3Bucket: !Ref LambdaCodeBucket S3Key: "ddb-to-firehose.zip" Runtime: "python3.6" Timeout: "25" Environment: Variables: DeliveryStreamName: !Ref DynamoDBTableName logGroup: Type: "AWS::Logs::LogGroup" Properties: LogGroupName: !Sub "/aws/lambda/${ddbToFirehose}" lambdaExecutionPolicy: Type: 'AWS::IAM::ManagedPolicy' Properties: Description: Managed policy for lambda function Roles: - !Ref lambdaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'firehose:PutRecord' - 'firehose:PutRecordBatch' - 'firehose:UpdateDestination' Resource: !GetAtt - firehoseDeliveryStream - Arn - Effect: Allow Action: - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${logGroup}:*" - Effect: Allow Action: - 'dynamodb:DescribeStream' - 'dynamodb:GetRecords' - 'dynamodb:GetShardIterator' - 'dynamodb:ListStreams' Resource: !GetAtt - myDynamoDBTable - StreamArn EventSourceMapping: Type: "AWS::Lambda::EventSourceMapping" DependsOn: - lambdaExecutionPolicy Properties: EventSourceArn: !GetAtt - myDynamoDBTable - StreamArn FunctionName: !GetAtt - ddbToFirehose - Arn StartingPosition: "TRIM_HORIZON" Outputs: TableName: Value: !Ref myDynamoDBTable Description: Table name of the newly created DynamoDB table BucketName: Value: !Ref myS3Bucket Description: My s3 bucket
スタック作成時にパラメータを指定します。
- DynamoDBTableName
- このスタックで作成される DynamoDB テーブルの名称
- 今回は
Movies
にしました
- LambdaCodeBucket
- 先ほどスクリプトをアップロードしたバケット
今回はスタック名はtest-dynamodb-to-s3
で作成しました。
このスタックで作成されるリソースは以下の通りです。
論理 ID | タイプ | リソース名 |
---|---|---|
myDynamoDBTable | AWS::DynamoDB::Table | Movies(パラメータで指定した値) |
EventSourceMapping | AWS::Lambda::EventSourceMapping | なし |
ddbToFirehose | AWS::Lambda::Function | <スタック名>-ddbToFirehose-<ランダム文字列> |
lambdaExecutionPolicy | AWS::IAM::ManagedPolicy | <スタック名>-lambdaExecutionPolicy-<ランダム文字列> |
lambdaExecutionRole | AWS::IAM::Role | <スタック名>-lambdaExecutionRole-<ランダム文字列> |
logGroup | AWS::Logs::LogGroup | /aws/lambda/<スタック名>-ddbToFirehose-<ランダム文字列> |
firehoseDeliveryStream | AWS::KinesisFirehose::DeliveryStream | Movies(DynamoDBテーブルと同じ) |
deliveryPolicy | AWS::IAM::ManagedPolicy | <スタック名>-deliveryPolicy-<ランダム文字列> |
deliveryRole | AWS::IAM::Role | <スタック名>-deliveryRole-<ランダム文字列> |
myS3Bucket | AWS::S3::Bucket | <スタック名>-mys3bucket-<ランダム文字列> |
Lambda 関数
作成されたリソースを確認しておきます。
トリガーとして DynamoDB ストリームが設定された Lambda 関数が作成されています。コードは手順 1. で設定したものです。
AWS CLI で確認しておきます。
% aws lambda get-function --function-name <スタック名>-ddbToFirehose-1MELND04EJEIH { "Configuration": { "FunctionName": "<スタック名>-ddbToFirehose-1MELND04EJEIH", "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:<スタック名>-ddbToFirehose-1MELND04EJEIH", "Runtime": "python3.6", "Role": "arn:aws:iam::000000000000:role/<スタック名>-lambdaExecutionRole-8V65X35YC08Q", "Handler": "ddb-to-firehose.lambda_handler", "CodeSize": 1418, "Description": "", "Timeout": 25, "MemorySize": 128, "LastModified": "2020-09-14T09:03:03.747+0000", "CodeSha256": "6GUWMPcAmflOYaBdlxuXPcCowVjb0SSosCoScY664Fk=", "Version": "$LATEST", "Environment": { "Variables": { "DeliveryStreamName": "Movies" } }, "TracingConfig": { "Mode": "PassThrough" }, "RevisionId": "811fda34-0fd5-46ce-9c67-a48da86c653d", "State": "Active", "LastUpdateStatus": "Successful" }, "Code": { "RepositoryType": "S3", "Location": "https://awslambda-ap-ne-1-tasks.s3.ap-northeast-1.amazonaws.com/snapshots/000000000000/<スタック名>-ddbToFirehose-1MELND04EJEIH-a9e4d6a5-f6db-43e0-9cf7-fa13509dbe2b }, "Tags": { "aws:cloudformation:stack-name": "<スタック名>", "aws:cloudformation:stack-id": "arn:aws:cloudformation:ap-northeast-1:000000000000:stack/<スタック名>/0768f760-f669-11ea-a32f-0693e5f01c30", "aws:cloudformation:logical-id": "ddbToFirehose" } }
トリガーの設定(イベントソースマッピング)も確認しておきます。
% aws lambda get-event-source-mapping --uuid b83b49e8-5e50-4249-bb77-59970d8d149a { "UUID": "b83b49e8-5e50-4249-bb77-59970d8d149a", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713", "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:<スタック名>-ddbToFirehose-1MELND04EJEIH", "LastModified": "2020-09-14T18:05:00+09:00", "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": {} }, "MaximumRecordAgeInSeconds": -1, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": -1 }
DynamoDB テーブル
この時点ではアイテムは登録されておらず、テーブルは空です。
DynamoDB ストリームと TTL が有効になっています。
TTL で指定されている属性ExpireTime
は、後続の手順で DynamoDB テーブルにアイテムを追加する際に付加するものです。
こちらも AWS CLI で確認します。
% aws dynamodb describe-table --table-name Movies { "Table": { "AttributeDefinitions": [ { "AttributeName": "Title", "AttributeType": "S" }, { "AttributeName": "Year", "AttributeType": "N" } ], "TableName": "Movies", "KeySchema": [ { "AttributeName": "Year", "KeyType": "HASH" }, { "AttributeName": "Title", "KeyType": "RANGE" } ], "TableStatus": "ACTIVE", "CreationDateTime": "2020-09-14T18:02:39.713000+09:00", "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 0, "ItemCount": 0, "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies", "TableId": "d69d5d14-099b-4df6-9b13-9951d88e4139", "StreamSpecification": { "StreamEnabled": true, "StreamViewType": "NEW_IMAGE" }, "LatestStreamLabel": "2020-09-14T09:02:39.713", "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713" } }
TTL を確認する際は別のコマンドです。
% aws dynamodb describe-time-to-live --table-name Movies { "TimeToLiveDescription": { "TimeToLiveStatus": "ENABLED", "AttributeName": "ExpireTime" } }
DynamoDB ストリームを確認する際も別コマンドです。ストリームの ARN を確認するために一旦 list します。
% aws dynamodbstreams list-streams { "Streams": [ { "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713", "TableName": "Movies", "StreamLabel": "2020-09-14T09:02:39.713" }, { "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T08:49:40.941", "TableName": "Movies", "StreamLabel": "2020-09-14T08:49:40.941" } ] }
最新のストリームの ARN を指定して describe します。
% aws dynamodbstreams describe-stream --stream-arn arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713 { "StreamDescription": { "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713", "StreamLabel": "2020-09-14T09:02:39.713", "StreamStatus": "ENABLED", "StreamViewType": "NEW_IMAGE", "CreationRequestDateTime": "2020-09-14T18:02:39.713000+09:00", "TableName": "Movies", "KeySchema": [ { "AttributeName": "Year", "KeyType": "HASH" }, { "AttributeName": "Title", "KeyType": "RANGE" } ], "Shards": [ { "ShardId": "shardId-00000001600074162263-9d9e0ab8", "SequenceNumberRange": { "StartingSequenceNumber": "100000000007133185793" } } ] } }
Kinesis Data Firehose 配信ストリーム
Firehose のストリームです。設定画面が縦に長くて収まらないですが、ソースは設定なしで、宛先としてCloudFormation でデプロイされた S3 バケットが指定されています。
AWS CLI での確認結果です。
% aws firehose describe-delivery-stream --delivery-stream-name Movies { "DeliveryStreamDescription": { "DeliveryStreamName": "Movies", "DeliveryStreamARN": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/Movies", "DeliveryStreamStatus": "ACTIVE", "DeliveryStreamEncryptionConfiguration": { "Status": "DISABLED" }, "DeliveryStreamType": "DirectPut", "VersionId": "1", "CreateTimestamp": "2020-09-14T18:03:29.367000+09:00", "Destinations": [ { "DestinationId": "destinationId-000000000001", "S3DestinationDescription": { "RoleARN": "arn:aws:iam::000000000000:role/<スタック名>-deliveryRole-76EH8KWJBO5W", "BucketARN": "arn:aws:s3:::<スタック名>-mys3bucket-1ounjvcs4v5ka", "Prefix": "firehose/", "BufferingHints": { "SizeInMBs": 1, "IntervalInSeconds": 60 }, "CompressionFormat": "UNCOMPRESSED", "EncryptionConfiguration": { "NoEncryptionConfig": "NoEncryption" }, "CloudWatchLoggingOptions": { "Enabled": false } }, "ExtendedS3DestinationDescription": { "RoleARN": "arn:aws:iam::000000000000:role/<スタック名>-deliveryRole-76EH8KWJBO5W", "BucketARN": "arn:aws:s3:::<スタック名>-mys3bucket-1ounjvcs4v5ka", "Prefix": "firehose/", "BufferingHints": { "SizeInMBs": 1, "IntervalInSeconds": 60 }, "CompressionFormat": "UNCOMPRESSED", "EncryptionConfiguration": { "NoEncryptionConfig": "NoEncryption" }, "CloudWatchLoggingOptions": { "Enabled": false }, "S3BackupMode": "Disabled" } } ], "HasMoreDestinations": false } }
3. DynamoDB テーブルへのアイテム追加
スクリプトを用いて DynamoDB テーブルへのアイテム追加を行います。
任意の環境で実行可能ですが、私は手元の端末から実行しました。
スクリプト
以下のスクリプトを用います。
ハイライト部の指定により、TTL の属性ExpireTIme
の挿入をしています。ここではアイテムの作成時刻から 1 時間で有効期限を迎えます。
######################################################################################### # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ######################################################################################### import os, sys, time, decimal from decimal import * import boto3 import json dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1') table = dynamodb.Table('Movies') def loadfile(infile): jsonobj = json.load(open(infile)) lc = 1 for movie in jsonobj: lc += 1 CreateTime = int(time.time()) ExpireTime = CreateTime + (1* 60* 60) response = table.put_item( Item={ 'Year': decimal.Decimal(movie['year']), 'Title': movie['title'], 'info': json.dumps(movie['info']), 'CreateTime': CreateTime, 'ExpireTime': ExpireTime } ) if (lc % 10) == 0: print ("%d rows inserted" % (lc)) if __name__ == '__main__': filename = sys.argv[1] if os.path.exists(filename): # file exists, continue loadfile(filename) else: print ('Please enter a valid filename')
スクリプトの実行環境において、 boto3 を実行可能である必要があることに注意してください。
また、22-23行目でアイテムの追加対象の DynamoDB テーブルを定義しています。リージョンやテーブル名など、適宜必要に応じて修正してください。
データの元ネタ(JSON)
スクリプトにより DynamoDB テーブルに追加するデータは、moviedata.json
として用意されています。
以下のような形式になっています。
[ { "year": 2013, "title": "Rush", "info": { "directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": [ "Action", "Biography", "Drama", "Sport" ], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "rank": 2, "running_time_secs": 7380, "actors": [ "Daniel Bruhl", "Chris Hemsworth", "Olivia Wilde" ] } }, { "year": 2013, "title": "Prisoners", "info": { "directors": ["Denis Villeneuve"], "release_date": "2013-08-30T00:00:00Z", "rating": 8.2, "genres": [ "Crime", "Drama", "Thriller" ], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTg0NTIzMjQ1NV5BMl5BanBnXkFtZTcwNDc3MzM5OQ@@._V1_SX400_.jpg", "plot": "When Keller Dover's daughter and her friend go missing, he takes matters into his own hands as the police pursue multiple leads and the pressure mounts. But just how far will this desperate father go to protect his family?", "rank": 3, "running_time_secs": 9180, "actors": [ "Hugh Jackman", "Jake Gyllenhaal", "Viola Davis" ] } }, ---以下略---
以下から zip 形式でダウンロード可能です。展開すると 3.7MB あるので、一部のみ抽出して使用するのもアリかもしれません。
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/moviedata.zip
スクリプトの実行
今回実行する端末では python3、boto3 は以下のバージョンです。
% python3 --version Python 3.7.3
% pip3 list Package Version --------------- ------- boto3 1.14.60 botocore 1.17.60 docutils 0.15.2 jmespath 0.10.0 pip 19.0.3 python-dateutil 2.8.1 s3transfer 0.3.3 setuptools 40.8.0 six 1.12.0 urllib3 1.25.10 wheel 0.33.1
スクリプトの引数として JSON ファイルを指定して実行します。
% python3 LoadMovieData.py moviedata.json 10 rows inserted 20 rows inserted 30 rows inserted 40 rows inserted …… 4610 rows inserted
DynamoDB テーブルへの追加の進捗状況が逐次表示されます。 JSON ファイルをそのまま使用すると結構なデータ量になりました。
DynamoDB には以下のようにアイテムが追加されています。
ExpireTIme
属性には (TTL)
の表示があります。ここでは UNIX 時間が指定されており、この時間を過ぎると削除されます。( バックグラウンドで処理され、期限切れから48時間以内に削除。)
(ここでは TTL が CreationTime
から 1時間後で設定されているので、翌朝覗いた際には跡形もなくアイテムが無くなっていました。)
4. S3 バケットの確認
DynamoDB ストリーム -> Lambda 関数 -> Firehose 配信ストリームを経由して、S3 バケットにはこのような形でアウトプットが出力されています。
ファイルの中身を抜粋すると、以下のような形式でテーブルの中身がエクスポートされています。
{"Year": "2013", "CreateTime": "1600080235", "Title": "Rush", "ExpireTime": "1600083835", "info": "{\"directors\": [\"Ron Howard\"], \"release_date\": \"2013-09-02T00:00:00Z\", \"rating\": 8.3, \"genres\": [\"Action\", \"Biography\", \"Drama\", \"Sport\"], \"image_url\": \"http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg\", \"plot\": \"A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.\", \"rank\": 2, \"running_time_secs\": 7380, \"actors\": [\"Daniel Bruhl\", \"Chris Hemsworth\", \"Olivia Wilde\"]}", "actor1": "Daniel Bruhl", "actor2": " ", "director1": "Ron Howard", "director2": " ", "genre1": "Action", "genre2": "Biography", "rating": 8.3} {"Year": "2013", "CreateTime": "1600080236", "Title": "Prisoners", "ExpireTime": "1600083836", "info": "{\"directors\": [\"Denis Villeneuve\"], \"release_date\": \"2013-08-30T00:00:00Z\", \"rating\": 8.2, \"genres\": [\"Crime\", \"Drama\", \"Thriller\"], \"image_url\": \"http://ia.media-imdb.com/images/M/MV5BMTg0NTIzMjQ1NV5BMl5BanBnXkFtZTcwNDc3MzM5OQ@@._V1_SX400_.jpg\", \"plot\": \"When Keller Dover's daughter and her friend go missing, he takes matters into his own hands as the police pursue multiple leads and the pressure mounts. But just how far will this desperate father go to protect his family?\", \"rank\": 3, \"running_time_secs\": 9180, \"actors\": [\"Hugh Jackman\", \"Jake Gyllenhaal\", \"Viola Davis\"]}", "actor1": "Hugh Jackman", "actor2": " ", "director1": "Denis Villeneuve", "director2": " ", "genre1": "Crime", "genre2": "Drama", "rating": 8.2}
今回の構成では、 TTL による有効期限切れとなったアイテムは Lambda 関数の処理対象となっていない( Lambda 関数は Invocate されるが Firehose に受け渡さない)ため、新規に追加されたアイテムのみが S3 バケットに出力されています。
TTL により削除されたアイテムがストリームにキャプチャされた場合、Lambda に引き渡されるレコードイベントに以下のような項目が含まれます。この値を活用して、TTL による削除が行われたアイテムのみをエクスポート対象とする、ということも可能です。
"Records":[ { ... "userIdentity":{ "type":"Service", "principalId":"dynamodb.amazonaws.com" } ... } ]
5. Athena を用いた分析
S3 バケットに出力されたレコードに対して、Athena を用いてクエリを実行することができます。
任意のデータベースを選択し、以下を実行することでテーブルを作成します。LOCATION
は上記で確認した出力先バケットの任意のパスを指定してください。
CREATE EXTERNAL TABLE `movies`( `year` int, `createtime` int, `title` string, `expiretime` int, `info` string, `actor1` string, `actor2` string, `director1` string, `director2` string, `genre1` string, `genre2` string, `rating` double) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<スタック名>-mys3bucket-1ounjvcs4v5ka/firehose/2020/09/14/10/'
今回はdefault
データベース上でmovies
というテーブルを作成しました。
テーブルに対して任意のクエリを実行できます。例えば、genre1
の平均評価を確認するクエリは以下です。
SELECT genre1, avg(rating) as avg_rating FROM "default"."movies" group by genre1 order by avg_rating desc;
一通りの動作確認ができました。
終わりに
DynamoDB ストリームでキャプチャしたアイテムを、Lambda 関数と Kinesis Data Firehose 配信ストリームを通じて S3 バケットに出力する構成を試してみました。
Lambda 関数のコード、各種リソースの CloudFormation テンプレートだけでなく DynamoDB テーブルに追加するアイテムのサンプル(しかもそれなりに大規模)が用意されていたのも助かります。
DynamoDB ストリームの使用感を知りたい方の参考になれば幸いです。
以上、千葉(幸)がお送りしました。